/*chunk table data structure */
#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <dirent.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "limits.h"
#include "adt.h"
#include "sysy_lib.h"
#include "bmap.h"
#include "net_table.h"
#include "configure.h"
#include "mem_cache.h"
#include "net_global.h"
#include "table_proto.h"
#include "volume_bh.h"
#include "../chunk/chunk_proto.h"
#include "lich_md.h"
#include "rmsnap_bh.h"
#include "ylog.h"
#include "dbg.h"

STATIC int __table_proto_snap_load(table_proto_t *table_proto);

typedef struct {
        uint64_t offset;
        void *ptr;
        int left;
} list_arg_t;

typedef struct {
        uint64_t version;
        uint32_t attr;
        uint64_t from;
        uint32_t __pad__[8];
} snap_attr_t;

typedef snap_t entry_t;

int __snap_create_from(snap_t **dst, const snap_t *src) {
        int ret;
        size_t size;
        snap_t *_dst;

        *dst = NULL;

        {
                size = sizeof(*src) + strlen(src->key) + 1
                       + CHKINFO_SIZE(src->chkinfo->repnum)
                       + CHKSTAT_SIZE(src->chkinfo->repnum);

                ret = ymalloc((void **)&_dst, size);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                memcpy(_dst, src, size);
                _dst->key = _dst->buf;
                _dst->chkinfo = (void *)&_dst->buf[strlen(src->key) + 1];
                _dst->chkstat = (void *)_dst->chkinfo + CHKINFO_SIZE(src->chkinfo->repnum);
        }

        *dst = _dst;
        return 0;
err_ret:
        return ret;
}

STATIC snap_t *__table_proto_snap_find(table_proto_t *table_proto, const char *key)
{
        snap_t *snap;
        struct list_head *pos;

        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;
                if (strcmp(key, snap->key) == 0) {
                        return snap;
                }
        }

        return NULL;
}

STATIC snap_t *__table_proto_snap_findbyid(table_proto_t *table_proto, const chkid_t *chkid)
{
        snap_t *snap;
        struct list_head *pos;

        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;
                if (chkid_cmp(chkid, &snap->chkinfo->id) == 0) {
                        return snap;
                }
        }

        return NULL;
}

STATIC snap_t *__table_proto_snap_findbyversion(table_proto_t *table_proto, uint64_t version)
{
        snap_t *snap;
        struct list_head *pos;

        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;
                if (snap->snap_version == version) {
                        return snap;
                }
        }

        return NULL;
}

STATIC snap_t *__table_proto_snap_firstchild(table_proto_t *table_proto, uint64_t snap_version)
{
        snap_t *snap;
        struct list_head *pos;

        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;
                if (snap->snap_from == snap_version) {
                        return snap;
                }
        }

        return NULL;
}

STATIC void __table_proto_snap_check(table_proto_t *table_proto)
{
        int64_t version;
        struct list_head *pos;
        snap_t *snap;

        version = -1;
        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;

                // TODO core 0 == 0
                // 1, the new one      (pos->prev)
                // 2, lich_system_root (pos)
                YASSERT(version < (int64_t)snap->snap_version);
                version = snap->snap_version;

                DBUG("file "CHKID_FORMAT" snap %s version %jd\n",
                      CHKID_ARG(&table_proto->chkid), snap->key, version);
        }
}

STATIC void __table_proto_snap_insert__(table_proto_t *table_proto, snap_t *snap)
{
        int found = 0;
        struct list_head *pos;
        snap_t *tmp;

        list_for_each_prev(pos, &table_proto->snap->snap) {
                tmp = (void *)pos;
                if (snap->snap_version > tmp->snap_version) {
                        list_add(&snap->hook, &tmp->hook);
                        found = 1;
                        break;
                }
        }

        if (!found) {
                list_add(&snap->hook, &table_proto->snap->snap);
        }
}

STATIC int __table_proto_snap_insert(table_proto_t *table_proto, const char *key,
                                     const chkinfo_t *chkinfo, uint64_t snap_version,
                                     uint64_t snap_from, int loc)
{
        int ret;
        snap_t *snap;

        snap = __table_proto_snap_find(table_proto, key);
        if (snap) {
                ret = EEXIST;
                GOTO(err_ret, ret);
        }

        ret = ymalloc((void **)&snap, sizeof(*snap) + strlen(key) + 1
                      + CHKINFO_SIZE(chkinfo->repnum)
                      + CHKSTAT_SIZE(chkinfo->repnum));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snap->key = snap->buf;
        snap->chkinfo = (void *)&snap->buf[strlen(key) + 1];
        snap->chkstat = (void *)snap->chkinfo + CHKINFO_SIZE(chkinfo->repnum); 
        snap->snap_version = snap_version;
        snap->snap_from = snap_from;
        snap->loc = loc;
        snap->unlink = 0;
        strcpy(snap->key, key);
        CHKINFO_CP(snap->chkinfo, chkinfo);
        memset(snap->chkstat, 0x0, CHKSTAT_SIZE(chkinfo->repnum));

        __table_proto_snap_insert__(table_proto, snap);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_read_smap(table_proto_t *table_proto, char *map)
{
        int offset, size;

        size = TABLE_PROTO_SMAP_AREA;
        offset = TABLE_PROTO_SMAP;

        return __table_proto_read(table_proto->chkinfo, table_proto->chkstat,
                                  map, size, offset, NULL);
}

STATIC int __table_proto_loadsmap(table_proto_t *table_proto)
{
        int ret;
        char *buf;

        memset(&table_proto->snap->bmap, 0x0, sizeof(table_proto->snap->bmap));

        ret = ymalloc((void **)&buf, TABLE_PROTO_SMAP_AREA);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table_proto_read_smap(table_proto, buf);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        bmap_load(&table_proto->snap->bmap, buf, (TABLE_PRORO_SNAP_ITEM_COUNT / CHAR_BIT));

        DINFO("load smap of table "CHKID_FORMAT" count %u\n",
              CHKID_ARG(&table_proto->chkid), table_proto->snap->bmap.nr_one);

        return 0;
err_free:
        yfree((void **)&buf);
err_ret:
        return ret;
}

STATIC int __table_proto_write_smap(table_proto_t *table_proto, const char *map, int _size)
{
        int ret, size, offset;
        char *buf;

#ifdef HAVE_STATIC_ASSERT
        static_assert(TABLE_PROTO_SMAP_AREA  <= PAGE_SIZE, "map");
#endif

        YASSERT(_size <= TABLE_PROTO_SMAP_AREA);
        buf = mem_cache_calloc(MEM_CACHE_4K, 1);

        size = TABLE_PROTO_SMAP_AREA;
        offset = TABLE_PROTO_SMAP;

        if (_size < size) {
                memcpy(buf, map, _size);
                ret = __table_proto_write(table_proto->chkinfo, table_proto->chkstat,
                                           buf, size, offset);
        } else {
                ret = __table_proto_write(table_proto->chkinfo, table_proto->chkstat,
                                          map, size, offset);
        }
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __table_proto_snap_read(table_proto_t *table_proto, char *buf, int from, int to, int *count)
{
        int ret, size, newsize;

        YASSERT(from * TABLE_PRORO_SNAP_ITEM_SIZE + TABLE_PROTO_ITEM
                < (int)LICH_CHUNK_SPLIT);

        size = (to - from) * TABLE_PRORO_SNAP_ITEM_SIZE;
        ret =  __table_proto_read(table_proto->chkinfo, table_proto->chkstat, buf, size,
                                  from * TABLE_PRORO_SNAP_ITEM_SIZE + TABLE_PROTO_SNAP, &newsize);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        YASSERT(from * TABLE_PRORO_SNAP_ITEM_SIZE + TABLE_PROTO_ITEM
                + ret <= (int)LICH_CHUNK_SPLIT);

        *count = newsize / TABLE_PRORO_SNAP_ITEM_SIZE;

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_loaditem(table_proto_t *table_proto, const char *buf,
                                       int from, int _to)
{
        int ret, loc, to;
        const char *key;
        const chkinfo_t *chkinfo;
        const snap_attr_t *attr;

        YASSERT(from < (int)table_proto->snap->bmap.size);
        to = _to > (int)table_proto->snap->bmap.size ? (int)table_proto->snap->bmap.size : _to;

        for (loc = from; loc < to; loc++) {
                if (bmap_get(&table_proto->snap->bmap, loc) == 0)
                        continue;

                key = (void *)buf + (loc - from) * TABLE_PRORO_SNAP_ITEM_SIZE;
                attr = (void *)key + TABLE_PRORO_SNAP_ITEM_SIZE / 2;
                chkinfo = (void *)attr + sizeof(*attr);

                //DBUG("load value '%s' at loc %u offset %u\n", value, loc, offset +  i * table_proto->item_size);
                //DBUG("real loc %u\n", offset + TABLE_PROTO_SMAP_AREA + i * table_proto->item_size);

                YASSERT(strlen(key));

                DBUG("insert %s  into "CHKID_FORMAT" loc %u, snap_version %ju\n",
                      key, CHKID_ARG(&table_proto->chkinfo->id), loc, attr->version);
                //CHKINFO_DUMP(chkinfo, D_INFO);
                YASSERT(chkinfo->id.type == __VOLUME_CHUNK__);

                ret = __table_proto_snap_insert(table_proto, key, chkinfo,
                                attr->version, attr->from, loc);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        __table_proto_snap_check(table_proto);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_load__(table_proto_t *table_proto)
{
        int ret, loc, buf_count, count;
        char buf[MAX_BUF_LEN];

        DBUG("load table "CHKID_FORMAT"\n", CHKID_ARG(&table_proto->chkid));

        loc = 0;
        buf_count = MAX_BUF_LEN / TABLE_PRORO_SNAP_ITEM_SIZE;
        while (loc < (int)table_proto->snap->bmap.size) {
                if (bmap_range_empty(&table_proto->snap->bmap, loc, loc + buf_count)) {
                        loc += buf_count;
                        continue;
                }

                ret = __table_proto_snap_read(table_proto, buf, loc, loc + buf_count, &count);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                //DBUG("load table %s offset %u size %u\n",
                //CHKID_ARG(&table_proto->chkid), offset + TABLE_PROTO_SMAP_AREA, ret);

                if (count == 0)
                        break;

                ret = __table_proto_snap_loaditem(table_proto, buf, loc, loc + count);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                loc += count;
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_commit(table_proto_t *table_proto, const void *item,
                                     int loc, int newitem)
{
        int ret;

        ret =  __table_proto_write(table_proto->chkinfo, table_proto->chkstat, item,
                                   TABLE_PRORO_SNAP_ITEM_SIZE,
                                   loc * TABLE_PRORO_SNAP_ITEM_SIZE + TABLE_PROTO_SNAP);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (newitem) {
                ret = bmap_set(&table_proto->snap->bmap, loc);
                if (unlikely(ret))
                        YASSERT(0);

                DBUG("commit at loc %u offset %u\n", loc,
                     loc * TABLE_PRORO_SNAP_ITEM_SIZE);

                ret = __table_proto_write_smap(table_proto, table_proto->snap->bmap.bits,
                                              (TABLE_PRORO_SNAP_ITEM_COUNT / CHAR_BIT));
                if (unlikely(ret)) {
                        bmap_del(&table_proto->snap->bmap, loc);
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_create(table_proto_t *table_proto, const char *key,
                                  const chkinfo_t *chkinfo, uint64_t snap_version,
                                  uint64_t snap_from)
{
        int ret, loc;
        entry_t *ent;
        char item[TABLE_PRORO_SNAP_ITEM_SIZE];
        snap_attr_t attr;

        if (table_proto->snap->bmap.nr_one >= TABLE_PRORO_SNAP_MAX) {
                ret = EDQUOT;
                GOTO(err_ret, ret);
        }

        YASSERT(chkinfo->id.type == __VOLUME_CHUNK__);

        memset(&attr, 0x0, sizeof(attr));

        strcpy(item, key);
        attr.version = snap_version;
        attr.from = snap_from;
        memcpy(item + TABLE_PRORO_SNAP_ITEM_SIZE / 2, &attr, sizeof(attr));
        CHKINFO_CP(item + TABLE_PRORO_SNAP_ITEM_SIZE / 2 + sizeof(attr), chkinfo);

        ent = __table_proto_snap_find(table_proto, key);
        if (ent) {
                ret = EEXIST;
                GOTO(err_ret, ret);
        }

        loc = bmap_get_empty(&table_proto->snap->bmap);
        if (loc == -1) {
                ret = ENOSPC;
                GOTO(err_ret, ret);
        }

        ret = __table_proto_snap_commit(table_proto, item, loc, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table_proto_snap_insert(table_proto, key, chkinfo,
                        snap_version, snap_from, loc);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __table_proto_snap_check(table_proto);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_get(table_proto_t *table_proto, const char *key,
                                  chkinfo_t *chkinfo, uint64_t *snap_version)
{
        int ret;
        entry_t *ent;

        ent = __table_proto_snap_find(table_proto,  key);
        if (ent == NULL) {
                ret = ENOKEY;
                GOTO(err_ret, ret);
        }

        if (chkinfo)
                CHKINFO_CP(chkinfo, ent->chkinfo);
        if (snap_version)
                *snap_version = ent->snap_version;

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_remove(table_proto_t *table_proto, const char *key)
{
        int ret;
        entry_t *ent;

        ent = __table_proto_snap_find(table_proto,  key);
        if (ent == NULL) {
                ret = ENOKEY;
                GOTO(err_ret, ret);
        }

        ret = bmap_del(&table_proto->snap->bmap, ent->loc);
        if (unlikely(ret))
                YASSERT(0);

        ret = __table_proto_write_smap(table_proto, table_proto->snap->bmap.bits,
                                       (TABLE_PRORO_SNAP_ITEM_COUNT / CHAR_BIT));
        if (unlikely(ret)) {
                bmap_set(&table_proto->snap->bmap, ent->loc);
                GOTO(err_ret, ret);
        }

        list_del(&ent->hook);
        yfree((void **)&ent);

        DINFO("table "CHKID_FORMAT" remove %s snap %u\n",
              CHKID_ARG(&table_proto->chkid), key, table_proto->snap->bmap.nr_one);

        return 0;
err_ret:
        return ret;        
}

STATIC int __table_proto_snap_update__(table_proto_t *table_proto, int loc, const char *key,
                                       uint64_t version, uint64_t from, const chkinfo_t *chkinfo)
{
        int ret;
        char item[TABLE_PRORO_SNAP_ITEM_SIZE];
        snap_attr_t attr;

        memset(&attr, 0x0, sizeof(attr));
        attr.version = version;
        attr.from = from;
        strcpy(item, key);
        memcpy(item + TABLE_PRORO_SNAP_ITEM_SIZE / 2, &attr, sizeof(attr));
        CHKINFO_CP(item + TABLE_PRORO_SNAP_ITEM_SIZE / 2 + sizeof(attr), chkinfo);

        ret = __table_proto_snap_commit(table_proto, item, loc, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_rename(table_proto_t *table_proto, const char *key, const char *to)
{
        int ret;
        entry_t *ent;

        ent = __table_proto_snap_find(table_proto,  to);
        if (ent) {
                ret = EBUSY;
                GOTO(err_ret, ret);
        }

        ent = __table_proto_snap_find(table_proto,  key);
        if (ent == NULL) {
                ret = ENOKEY;
                GOTO(err_ret, ret);
        }

        ret = __table_proto_snap_update__(table_proto, ent->loc, to,
                                          ent->snap_version, ent->snap_from, ent->chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table_proto_snap_insert(table_proto, to, ent->chkinfo,
                        ent->snap_version, ent->snap_from, ent->loc);
        if (unlikely(ret)) {
                YASSERT(ret != EEXIST);
                GOTO(err_ret, ret);
        }

        list_del(&ent->hook);
        yfree((void **)&ent);

        __table_proto_snap_check(table_proto);

        DINFO("table "CHKID_FORMAT" rename %s to %s, left %u\n",
              CHKID_ARG(&table_proto->chkid), key, to, table_proto->snap->bmap.nr_one);

        return 0;
err_ret:
        return ret;        
}

STATIC int __table_proto_snap_updateparent(table_proto_t *table_proto, const char *key, const uint64_t from)
{
        int ret;
        entry_t *ent;

        ent = __table_proto_snap_find(table_proto,  key);
        if (ent == NULL) {
                ret = ENOKEY;
                GOTO(err_ret, ret);
        }

        ret = __table_proto_snap_update__(table_proto, ent->loc, key,
                                          ent->snap_version, from, ent->chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_del(&ent->hook);

        ret = __table_proto_snap_insert(table_proto, key, ent->chkinfo,
                        ent->snap_version, from, ent->loc);
        if (unlikely(ret)) {
                YASSERT(ret != EEXIST);
                GOTO(err_ret, ret);
        }

        __table_proto_snap_check(table_proto);

        DINFO("table "CHKID_FORMAT" update %s parent to %ld, left %u\n",
              CHKID_ARG(&table_proto->chkid), key, from, table_proto->snap->bmap.nr_one);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_list(table_proto_t *table_proto, const char *pool, uint64_t offset, void *_buf,
                               int *_buflen)
{
        int ret, reclen, left;
        struct dirent *de;
        struct list_head *pos;
        snap_t *snap;
        char info[MAX_INFO_LEN];

        (void) offset;

        de = _buf;
        left = *_buflen;
        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;
                snprintf(info, MAX_INFO_LEN, "%s %ld %ld",
                                snap->key, snap->snap_version, snap->snap_from);
                reclen = sizeof(*de) + strlen(info) + 1;
                if (left < (reclen + (int)sizeof(*de))) {
                        ret = ENOMEM;
                        GOTO(err_ret, ret);
                }

                DBUG("snap %s snap_version %ld snap_from %ld\n",
                                snap->key, snap->snap_version, snap->snap_from);

                if (is_deleted_snap(snap->key) && snap->unlink == 0) {
                        snap->unlink = 1;
                        //volume_bh_snapcleanup(&table_proto->chkid, snap->key);
                        ret = rmsnap_bh_create(pool, &table_proto->chkid, snap->key);
                        if (ret) {
                                if (ret != EEXIST)
                                        snap->unlink = 0;
                                DWARN("rmsnap_bh_create "CHKID_FORMAT" %s failed ret:%d\n",
                                                CHKID_ARG(&table_proto->chkid), snap->key, ret);
                        }
                }

                de->d_reclen = reclen;
                strcpy(de->d_name, info);
                left -= de->d_reclen;
                de = (void *)de + reclen;
        }

        reclen = (sizeof(*de) + 1);
        memset(de, 0x0, sizeof(*de));
        de->d_name[0] = '\0';
        de->d_reclen = reclen;
        left -= reclen;

        *_buflen = *_buflen - left;

        DINFO("buflen %u\n", *_buflen);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_iterator(table_proto_t *table_proto, func2_t func2, void *_arg)
{
        struct list_head *pos;
        snap_t *snap;

        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;

                DINFO("scan "CHKID_FORMAT" --> "CHKID_FORMAT"\n",
                      CHKID_ARG(&table_proto->chkid), CHKID_ARG(&snap->chkinfo->id));
                func2(_arg, &table_proto->chkid, snap->chkinfo);
        }

        return 0;
}

STATIC int __table_proto_snap_unintact(table_proto_t *table_proto, func3_t func3, void *_arg)
{
        struct list_head *pos;
        snap_t *snap;

        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;

                DINFO("scan "CHKID_FORMAT" --> "CHKID_FORMAT"\n",
                      CHKID_ARG(&table_proto->chkid), CHKID_ARG(&snap->chkinfo->id));
                func3(_arg, &table_proto->chkid, snap->chkinfo, NULL);
        }

        return 0;
}

STATIC int __table_proto_snap_iterator2(table_proto_t *table_proto, func_int2_t func2, void *_arg)
{
        int ret;
        struct list_head *pos;
        snap_t *snap;

        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;

                DINFO("scan "CHKID_FORMAT" --> "CHKID_FORMAT"\n",
                      CHKID_ARG(&table_proto->chkid), CHKID_ARG(&snap->chkinfo->id));
                ret = func2(_arg, &table_proto->chkid, snap);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_load(table_proto_t *table_proto)
{
        int ret;

        ret = __table_proto_loadsmap(table_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table_proto_snap_load__(table_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_last(table_proto_t *table_proto, nid_t *snapnid, fileid_t *fileid,
                                   char *name, uint64_t *snap_version)
{
        int ret;
        snap_t *snap;
        struct list_head *pos;
        uint64_t last = 0;

        if (table_proto->snap->bmap.nr_one == 0) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;
                if (snap->snap_version >= last) {
                        last = snap->snap_version;

                        if (fileid)
                                *fileid = snap->chkinfo->id;
                        if (snap_version)
                                *snap_version = snap->snap_version;
                        if (snapnid)
                                *snapnid = snap->chkinfo->diskid[0].id;
                        if (name)
                                strcpy(name, snap->key);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_last_version(table_proto_t *table_proto, uint64_t *snap_version)
{
        nid_t snapnid;
        fileid_t snapid;

        return __table_proto_snap_last(table_proto, &snapnid, &snapid, NULL, snap_version);
}

STATIC int __table_proto_snap_usable_version(table_proto_t *table_proto, uint64_t *snap_version)
{
        int ret, found;
        snap_t *snap;
        struct list_head *pos;
        uint64_t ver = 0;

        if (table_proto->snap->bmap.nr_one == 0) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        while (ver != (uint64_t)-1) {
                found = 0;
                list_for_each(pos, &table_proto->snap->snap) {
                        snap = (void *)pos;
                        if (snap->snap_version == ver) {
                                found = 1;
                                break;
                        }
                }

                if (found) {
                        ver++;
                        continue;
                } else {
                        *snap_version = ver;
                        break;
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_getinfo(table_proto_t *table_proto, const fileid_t *fileid,
                                      chkinfo_t *chkinfo)
{
        int ret;
        entry_t *ent;

        ent  = __table_proto_snap_findbyid(table_proto, fileid);
        if (ent == NULL) {
                ret = ENOENT;
                DWARN("lookup "CHKID_FORMAT" (%u) %s\n", CHKID_ARG(fileid), ret, strerror(ret))
                GOTO(err_ret, ret);
        }

        CHKINFO_CP(chkinfo, ent->chkinfo);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_setinfo(table_proto_t *table_proto, const fileid_t *fileid,
                                      const chkinfo_t *chkinfo, uint64_t info_version)
{
        int ret;
        entry_t *ent;

        YASSERT(chkinfo->id.type == __VOLUME_CHUNK__);
        ent  = __table_proto_snap_findbyid(table_proto, fileid);
        if (ent == NULL) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        if (ent->chkinfo->info_version != info_version) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }
                
        if (ent->chkinfo->info_version == chkinfo->info_version) {
                return 0;
        }

        ret = __table_proto_snap_update__(table_proto, ent->loc, ent->key,
                                          ent->snap_version, ent->snap_from, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_CP(ent->chkinfo, chkinfo);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_next(table_proto_t *table_proto, const fileid_t *from,
                fileid_t *fileid, char *name, uint64_t *snap_version)
{
        int ret;
        snap_t *next = NULL, *snap;

        snap = __table_proto_snap_findbyid(table_proto, from);
        if (snap == NULL) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        next = __table_proto_snap_firstchild(table_proto, snap->snap_version);
        if (next == NULL) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        DINFO("current %ju, next %ju\n", snap->snap_version, next->snap_version);

        *fileid = next->chkinfo->id;
        if (name)
                strcpy(name, next->key);
        if (snap_version)
                *snap_version = next->snap_version;

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_haschild(table_proto_t *table_proto, uint64_t snap_version)
{
        snap_t *snap = NULL;

        snap = __table_proto_snap_firstchild(table_proto, snap_version);
        if (snap == NULL) {
                return 0;
        }

        return 1;
}

STATIC int __table_proto_snap_listchild(table_proto_t *table_proto, uint64_t snap_from,
                struct list_head *list)
{
        int ret, size;
        struct list_head *pos;
        snap_t *snap, *child;

        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;

                if (snap->snap_from == snap_from) {
                        DINFO("list child snap_version:%lu --> "CHKID_FORMAT"\n",
                                        snap_from, CHKID_ARG(&snap->chkinfo->id));

                        size = sizeof(*snap) + strlen(snap->key) + 1
                                      + CHKINFO_SIZE(snap->chkinfo->repnum)
                                      + CHKSTAT_SIZE(snap->chkinfo->repnum);
                        ret = ymalloc((void **)&child, size);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        memcpy(child, snap, size);
                        child->key = child->buf;
                        child->chkinfo = (void *)&child->buf[strlen(snap->key) + 1];
                        child->chkstat = (void *)child->chkinfo + CHKINFO_SIZE(snap->chkinfo->repnum);

                        list_add_tail(&child->hook, list);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_listdescendant(table_proto_t *table_proto, uint64_t snap_from,
                                             struct list_head *list)
{
        snap_t *snap, *child, *tmp;
        uint64_t snap_version = snap_from;

        while (1) {
                snap = __table_proto_snap_findbyversion(table_proto, snap_version);
                if (snap == NULL) {
                        break;
                }

                __snap_create_from(&tmp, snap);

                list_add_tail(&tmp->hook, list);

                // next
                child = __table_proto_snap_firstchild(table_proto, snap_version);
                if (child == NULL) {
                        break;
                }

                snap_version = child->snap_version;
        }

        return 0;
}

STATIC int __table_proto_snap_listparent(table_proto_t *table_proto, uint64_t snap_version,
                struct list_head *list)
{
        int ret, size, found;
        uint64_t snap_parent = snap_version;
        struct list_head *pos;
        snap_t *snap, *parent;

        while(snap_parent != (uint64_t) -1) {
                found = 0;

                list_for_each(pos, &table_proto->snap->snap) {
                        snap = (void *)pos;

                        if (snap->snap_version == snap_parent) {
                                DBUG("list parent snap_version:%ld --> "CHKID_FORMAT" parent:%ld\n",
                                      snap_parent, CHKID_ARG(&snap->chkinfo->id), snap->snap_from);

                                size = sizeof(*snap) + strlen(snap->key) + 1
                                              + CHKINFO_SIZE(snap->chkinfo->repnum)
                                              + CHKSTAT_SIZE(snap->chkinfo->repnum);
                                ret = ymalloc((void **)&parent, size);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                memcpy(parent, snap, size);
                                parent->key = parent->buf;
                                parent->chkinfo = (void *)&parent->buf[strlen(snap->key) + 1];
                                parent->chkstat = (void *)parent->chkinfo + CHKINFO_SIZE(snap->chkinfo->repnum);

                                list_add(&parent->hook, list);
                                snap_parent = snap->snap_from;
                                found = 1;
                                break;
                        }
                }

                if (!found) {
                        ret = ENOKEY;
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_prev(table_proto_t *table_proto, const fileid_t *from,
                fileid_t *fileid, char *name, uint64_t *snap_version)
{
        int ret;
        snap_t *prev = NULL, *snap;

        snap = __table_proto_snap_findbyid(table_proto, from);
        if (snap == NULL) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        prev = __table_proto_snap_findbyversion(table_proto, snap->snap_from);
        if (prev == NULL) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        DINFO("current %ju, prev %ju\n", snap->snap_version, prev->snap_version);

        if (fileid)
                *fileid = prev->chkinfo->id;
        if (name)
                strcpy(name, prev->key);
        if (snap_version)
                *snap_version = prev->snap_version;

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_getbyversion(table_proto_t *table_proto, uint64_t snap_version,
                chkinfo_t *chkinfo, char *name)
{
        int ret;
        snap_t *snap;

        snap = __table_proto_snap_findbyversion(table_proto, snap_version);
        if (snap == NULL) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        if (chkinfo) {
                CHKINFO_CP(chkinfo, snap->chkinfo);
        }

        if (name) {
                strcpy(name, snap->key);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_snap_getversion(table_proto_t *table_proto,
                                           const chkid_t *chkid, uint64_t *snap_version)
{
        int ret;
        snap_t *snap;

        snap = __table_proto_snap_findbyid(table_proto, chkid);
        if (snap == NULL) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        *snap_version = snap->snap_version;

        return 0;
err_ret:
        return ret;
}

static inline int __is_system_snap(const char *name) {
        if (memcmp(name, LICH_SYSTEM, strlen(LICH_SYSTEM)) == 0) {
                return 1;
        }

        return 0;
}

/**
 * if there is no user snapshot on the volume,
 * then set empty = 1.
 **/
STATIC int __table_proto_snap_isempty(table_proto_t *table_proto, int *empty)
{
        snap_t *snap;
        struct list_head *pos;

        *empty = 1;
        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;
                if (!__is_system_snap(snap->key)) {
                        *empty = 0;
                        break;
                }
        }

        return 0;
}

/**
 * if there has some snap in deleting,
 * then set deleting = 1.
 **/
STATIC int __table_proto_snap_isdeleting(table_proto_t *table_proto, int *deleting)
{
        snap_t *snap;
        struct list_head *pos;

        *deleting = 0;
        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;
                if (is_deleted_snap(snap->key)) {
                        *deleting = 1;
                        break;
                }
        }

        return 0;
}

STATIC int __table_proto_snap_count(IN table_proto_t *table_proto, IN int snap_type, OUT int *count)
{
        snap_t *snap;
        struct list_head *pos;
        int snap_system_count = 0, snap_user_count = 0, ret = 0;

        if (snap_type == SNAP_ALL) {
                *count = table_proto->snap->bmap.nr_one;
                return 0;
        }

        list_for_each(pos, &table_proto->snap->snap) {
                snap = (void *)pos;
                if (__is_system_snap(snap->key)) {
                        snap_system_count ++;
                } else {
                        snap_user_count ++;
                }
        }

        switch (snap_type) {
        case SNAP_USER:
                *count = snap_user_count;
                break;
        case SNAP_SYSTEM:
                *count = snap_system_count;
                break;
        default:
                ret = EINVAL;
                DERROR("error snap type :%d\n", snap_type);
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int table_snap_init(table_proto_t *table_proto)
{
        int ret;

        ret = ymalloc((void **)&table_proto->snap, sizeof(*table_proto->snap));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table_proto->snap->create = __table_proto_snap_create;
        table_proto->snap->get_byname = __table_proto_snap_get;
        table_proto->snap->remove = __table_proto_snap_remove;
        table_proto->snap->rename = __table_proto_snap_rename;
        table_proto->snap->updateparent = __table_proto_snap_updateparent;
        table_proto->snap->list = __table_proto_snap_list;
        table_proto->snap->unintact = __table_proto_snap_unintact;
        table_proto->snap->iterator = __table_proto_snap_iterator;
        table_proto->snap->iterator2 = __table_proto_snap_iterator2;
        table_proto->snap->last = __table_proto_snap_last;
        table_proto->snap->last_version = __table_proto_snap_last_version;
        table_proto->snap->usable_version = __table_proto_snap_usable_version;
        table_proto->snap->getinfo = __table_proto_snap_getinfo;
        table_proto->snap->setinfo = __table_proto_snap_setinfo;
        table_proto->snap->has_child = __table_proto_snap_haschild;
        table_proto->snap->list_child = __table_proto_snap_listchild;
        table_proto->snap->list_descendant = __table_proto_snap_listdescendant;
        table_proto->snap->list_parent = __table_proto_snap_listparent;
        table_proto->snap->next = __table_proto_snap_next;
        table_proto->snap->prev = __table_proto_snap_prev;
        table_proto->snap->get_version = __table_proto_snap_getversion;
        table_proto->snap->get_byversion = __table_proto_snap_getbyversion;
        table_proto->snap->is_empty = __table_proto_snap_isempty;
        table_proto->snap->is_deleting = __table_proto_snap_isdeleting;
        table_proto->snap->count = __table_proto_snap_count;
        INIT_LIST_HEAD(&table_proto->snap->snap);

        ret = __table_proto_snap_load(table_proto);
        if (unlikely(ret))
                GOTO(err_free, ret);

        return 0;
err_free:
        table_snap_destroy(table_proto);
err_ret:
        return ret;        
}

void table_snap_destroy(table_proto_t *table_proto)
{
        struct list_head *pos, *n;

        if (table_proto->snap) {
                list_for_each_safe(pos, n, &table_proto->snap->snap) {
                        list_del(pos);
                        yfree((void **)&pos);
                }

                bmap_destroy(&table_proto->snap->bmap);
                yfree((void **)&table_proto->snap);
                table_proto->snap = NULL;
        }
}
